/*
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 */

package org.opensearch.indices.pollingingest.mappers;

import org.opensearch.common.util.RequestUtils;
import org.opensearch.index.IngestionShardPointer;
import org.opensearch.index.Message;
import org.opensearch.indices.pollingingest.IngestionUtils;
import org.opensearch.indices.pollingingest.ShardUpdateMessage;

import java.util.Map;

import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
import static org.opensearch.indices.pollingingest.MessageProcessorRunnable.ID;

/**
 * Default mapper implementation that expects messages in the format:
 * {
 *   "_id": "document_id",
 *   "_op_type": "index|create|delete",
 *   "_version": "external document version"
 *   "_source": { ... document fields ... }
 * }
 *
 * <p> Document ID will be auto-generated if not present in the incoming message.
 */
public class DefaultIngestionMessageMapper implements IngestionMessageMapper {

    @Override
    public ShardUpdateMessage mapAndProcess(IngestionShardPointer pointer, Message message) throws IllegalArgumentException {
        Map<String, Object> payloadMap = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());

        // Extract or generate _id
        long autoGeneratedIdTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
        if (payloadMap.containsKey(ID) == false) {
            String id = RequestUtils.generateID();
            payloadMap.put(ID, id);
            autoGeneratedIdTimestamp = System.currentTimeMillis();
        }

        return new ShardUpdateMessage(pointer, message, payloadMap, autoGeneratedIdTimestamp);
    }
}
